﻿using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using QQ2564874169.Core;
using QQ2564874169.Core.Utils;
using QQ2564874169.Json;

namespace QQ2564874169.RelationalSql.Trigger
{
    public class TriggerSubscriber
    {
        public event EventHandler<TriggerReceiveEventArgs> Receive;
        public event EventHandler<TriggerReceiveErrorEventArgs> ReceiveError;
        public event EventHandler<TriggerSubscriberErrorEventArgs> SubscriberError;
        public event EventHandler<TriggerWatchErrorEventArgs> WatchError;

        private ITypeContainer _container;
        private bool _running;
        private int _executing;
        private Dictionary<string, List<ServiceSetting>> _settings;
        private List<Type> _watchAllTypes;

        public TriggerSubscriber(ITypeContainer container)
        {
            _container = container;
        }

        protected virtual object Deserialize(string data, Type type)
        {
            return JsonHelper.JsonTo(data, type);
        }

        protected virtual void OnReceive(TriggerReceiveEventArgs e)
        {
            Receive?.Invoke(this, e);
        }

        protected virtual void OnReceiveError(TriggerReceiveErrorEventArgs e)
        {
            ReceiveError?.Invoke(this, e);
        }

        protected virtual void OnSubscriberError(TriggerSubscriberErrorEventArgs e)
        {
            SubscriberError?.Invoke(this, e);
        }

        protected virtual void OnWatchError(TriggerWatchErrorEventArgs e)
        {
            WatchError?.Invoke(this, e);
        }

        public void Stop()
        {
            _running = false;

            while (_executing > 0)
            {
                TaskHelper.Delay(100).Wait();
            }
        }

        public void Start(int paralleMax = 10)
        {
            if (_running)
                throw new InvalidOperationException("必须先停止才能启动。");

            _running = true;

            for (var i = 0; i < paralleMax; i++)
            {
                TaskHelper.Run(() =>
                {
                    while (_running)
                    {
                        try
                        {
                            WorkTask();
                        }
                        catch (Exception ex)
                        {
                            var errargs = new TriggerSubscriberErrorEventArgs
                            {
                                Error = ex
                            };
                            OnSubscriberError(errargs);
                            if (errargs.ExitTask)
                                break;
                        }
                    }
                    lock (this)
                    {
                        _executing--;
                    }
                });
            }
            _executing = paralleMax;
        }

        private void WorkTask()
        {
            using (var container = _container.NewContainer())
            {
                var sql = container.Resolve<ISql>();
                var delay = 1;

                while (_running)
                {
                    using (var t = sql.BeginTransaction())
                    {
                        var revarg = new TriggerReceiveEventArgs
                        {
                            Item = ReadNextItem(sql)
                        };
                        if (revarg.Item != null && revarg.Item.Id != null)
                        {
                            delay = 1;
                            try
                            {
                                OnReceive(revarg);

                                if (revarg.Reconsume)
                                    continue;

                                UpdateProcessed(sql, revarg.Item.Id);

                                t.Commit();
                            }
                            catch (Exception ex)
                            {
                                var errargs = new TriggerReceiveErrorEventArgs
                                {
                                    Item = revarg.Item,
                                    Error = ex
                                };
                                OnReceiveError(errargs);
                            }
                        }
                        else if (delay < 10000)
                        {
                            delay *= 2;
                        }
                    }
                    TaskHelper.Delay(delay).Wait();
                }
            }
        }

        protected virtual void UpdateProcessed(ISql sql, string id)
        {
            var execsql = string.Format("UPDATE {0} SET {1}=1,{3}=@ptime WHERE {2}=@id AND {1}=0",
                nameof(TriggerItem),
                nameof(TriggerItem.Processed),
                nameof(TriggerItem.Id),
                nameof(TriggerItem.ProcessTime));
            sql.Execute(execsql, new {id, ptime = DateTime.Now.ToLong()});
        }

        protected virtual TriggerItem ReadNextItem(ISql sql)
        {
            return sql.Query<TriggerItem>($@"
                SELECT TOP 1 * FROM [{nameof(TriggerItem)}] WITH(UPDLOCK,READPAST)
                WHERE [{nameof(TriggerItem.Processed)}]=0")
                .FirstOrDefault();
        }

        public void LoadService(params Type[] types)
        {
            Receive -= OnWatch;
            Receive += OnWatch;

            if (_settings == null)
            {
                _watchAllTypes = new List<Type>();
                _settings = new Dictionary<string, List<ServiceSetting>>();
            }

            foreach (var type in types)
            {
                if (!type.IsClass || type.IsAbstract)
                    continue;
                if (typeof(IWatchAfter).IsAssignableFrom(type))
                    _watchAllTypes.Add(type);

                var insts = type.GetInterfaces().Where(i => i.IsGenericType).ToArray();
                if (insts.Length < 1) continue;

                foreach (var it in insts)
                {
                    var gt = it.GetGenericTypeDefinition();
                    var setting = new ServiceSetting
                    {
                        ServiceType = type
                    };
                    if (gt == typeof(IWatchInsertAfter<>))
                    {
                        setting.Type = TriggerType.Insert;
                    }
                    else if (gt == typeof(IWatchDeleteAfter<>))
                    {
                        setting.Type = TriggerType.Delete;
                    }
                    else if (gt == typeof(IWatchUpdateAfter<>))
                    {
                        setting.Type = TriggerType.Update;
                    }
                    else if (gt == typeof(IWatchSetNullAfter<>))
                    {
                        setting.Type = TriggerType.SetNull;
                    }
                    else if (gt == typeof(IWatchProcAfter<>))
                    {
                        setting.Type = TriggerType.Proc;
                    }
                    else
                    {
                        continue;
                    }
                    var tab = it.GetGenericArguments().First().FullName;
                    if (tab == null) return;
                    lock (this)
                    {
                        if (_settings.ContainsKey(tab) == false)
                            _settings.Add(tab, new List<ServiceSetting>());
                    }
                    _settings[tab].Add(setting);
                }
            }
        }

        private void OnWatch(object sender, TriggerReceiveEventArgs e)
        {
            if (e.Item.Type.HasValue == false)
                return;
            var etype = Type.GetType(e.Item.EntityType, false);
            if (etype == null)
                return;

            var tabType = Type.GetType(e.Item.TableType, false);
            if (tabType == null)
                return;
            var operation = e.Item.Type.Value.ToDbExecuteOperation();

            if (_watchAllTypes.Count > 0)
            {
                var watchAllMethod = typeof(IWatchAfter).GetMethods().First();

                Parallel.ForEach(_watchAllTypes.ToArray(), servType =>
                {
                    using (var container = _container.NewContainer())
                    {
                        var service = container.Resolve(servType);
                        var obj = Deserialize(e.Item.Entity, etype);
                        var args = new[] { operation, tabType, obj};
                        try
                        {
                            watchAllMethod.Invoke(service, args);
                        }
                        catch (Exception ex)
                        {
                            OnWatchError(new TriggerWatchErrorEventArgs
                            {
                                Error = ex,
                                Service = service,
                                Type = e.Item.Type.Value,
                                Model = obj
                            });
                        }
                    }
                });
            }
            if (tabType.FullName == null)
                return;
            var key = tabType.FullName;
            if (_settings.ContainsKey(key) == false)
                return;
            var list = _settings[key].Where(i => i.Type == e.Item.Type).Select(i => i.ServiceType).ToArray();
            if (list.Length < 1)
                return;
            Type itype;
            switch (e.Item.Type.Value)
            {
                case TriggerType.Insert:
                    itype = typeof(IWatchInsertAfter<>);
                    break;
                case TriggerType.Delete:
                    itype = typeof(IWatchDeleteAfter<>);
                    break;
                case TriggerType.Update:
                    itype = typeof(IWatchUpdateAfter<>);
                    break;
                case TriggerType.SetNull:
                    itype = typeof(IWatchSetNullAfter<>);
                    break;
                case TriggerType.Proc:
                    itype = typeof(IWatchProcAfter<>);
                    break;
                default:
                    throw new NotSupportedException(e.Item.Type.Value.ToString());
            }
            var watchMethod = itype.MakeGenericType(tabType).GetMethods().First();

            Parallel.ForEach(list, servType =>
            {
                using (var container = _container.NewContainer())
                {
                    var service = container.Resolve(servType);
                    var obj = Deserialize(e.Item.Entity, etype);
                    var args = new[] {obj};
                    if (obj is UpdateArgs)
                    {
                        args = new[] {((UpdateArgs) obj).Sets, ((UpdateArgs) obj).Where};
                    }
                    try
                    {
                        watchMethod.Invoke(service, args);
                    }
                    catch (Exception ex)
                    {
                        OnWatchError(new TriggerWatchErrorEventArgs
                        {
                            Error = ex,
                            Service = service,
                            Type = e.Item.Type.Value,
                            Model = obj,
                            WatchType = tabType
                        });
                    }
                }
            });
        }

        private class ServiceSetting
        {
            public Type ServiceType { get; set; }
            public TriggerType Type { get; set; }
        }
    }
}



